/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.runners.fnexecution.control;



import com.bff.gaia.unified.model.fnexecution.v1.UnifiedFnApi;

import com.bff.gaia.unified.model.fnexecution.v1.UnifiedFnApi.InstructionRequest;

import com.bff.gaia.unified.sdk.fn.stream.SynchronizedStreamObserver;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.io.grpc.Status;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;

import com.bff.gaia.unified.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.io.Closeable;

import java.util.Map;

import java.util.Set;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.CompletionStage;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentMap;

import java.util.concurrent.atomic.AtomicBoolean;

import java.util.function.Consumer;



/**

 * A client for the control plane of an SDK harness, which can issue requests to it over the Fn API.

 *

 * <p>This class presents a low-level Java API de-inverting the Fn API's gRPC layer.

 *

 * <p>The Fn API is inverted so the runner is the server and the SDK harness is the client, for

 * firewalling reasons (the runner may execute in a more privileged environment forbidding outbound

 * connections).

 *

 * <p>This low-level client is responsible only for correlating requests with responses.

 */

public class FnApiControlClient implements Closeable, InstructionRequestHandler {

  private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class);



  // All writes to this StreamObserver need to be synchronized.

  private final StreamObserver<UnifiedFnApi.InstructionRequest> requestReceiver;

  private final ResponseStreamObserver responseObserver = new ResponseStreamObserver();

  private final ConcurrentMap<String, CompletableFuture<UnifiedFnApi.InstructionResponse>>

      outstandingRequests;

  private final Set<Consumer<FnApiControlClient>> onCloseListeners = ConcurrentHashMap.newKeySet();

  private final String workerId;

  private AtomicBoolean isClosed = new AtomicBoolean(false);



  private FnApiControlClient(String workerId, StreamObserver<InstructionRequest> requestReceiver) {

    this.workerId = workerId;

    this.requestReceiver = SynchronizedStreamObserver.wrapping(requestReceiver);

    this.outstandingRequests = new ConcurrentHashMap<>();

  }



  /**

   * Returns a {@link FnApiControlClient} which will submit its requests to the provided observer.

   *

   * <p>It is the responsibility of the caller to register this object as an observer of incoming

   * responses (this will generally be done as part of fulfilling the contract of a gRPC service).

   */

  public static FnApiControlClient forRequestObserver(

      String workerId, StreamObserver<UnifiedFnApi.InstructionRequest> requestObserver) {

    return new FnApiControlClient(workerId, requestObserver);

  }



  @Override

  public CompletionStage<UnifiedFnApi.InstructionResponse> handle(

      UnifiedFnApi.InstructionRequest request) {

    LOG.debug("Sending InstructionRequest {}", request);

    CompletableFuture<UnifiedFnApi.InstructionResponse> resultFuture = new CompletableFuture<>();

    outstandingRequests.put(request.getInstructionId(), resultFuture);

    requestReceiver.onNext(request);

    return resultFuture;

  }



  public StreamObserver<UnifiedFnApi.InstructionResponse> asResponseObserver() {

    return responseObserver;

  }



  @Override

  public void close() {

    closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection"));

  }



  public String getWorkerId() {

    return workerId;

  }



  /** Closes this client and terminates any outstanding requests exceptionally. */

  private void closeAndTerminateOutstandingRequests(Throwable cause) {

    if (isClosed.getAndSet(true)) {

      return;

    }



    try {

      // Make a copy of the map to make the view of the outstanding requests consistent.

      Map<String, CompletableFuture<UnifiedFnApi.InstructionResponse>> outstandingRequestsCopy =

          new ConcurrentHashMap<>(outstandingRequests);

      outstandingRequests.clear();



      if (outstandingRequestsCopy.isEmpty()) {

        requestReceiver.onCompleted();

        return;

      }

      requestReceiver.onError(

          new StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));



      LOG.error(

          "{} closed, clearing outstanding requests {}",

          FnApiControlClient.class.getSimpleName(),

          outstandingRequestsCopy);

      for (CompletableFuture<UnifiedFnApi.InstructionResponse> outstandingRequest :

          outstandingRequestsCopy.values()) {

        outstandingRequest.completeExceptionally(cause);

      }

    } finally {

      for (Consumer<FnApiControlClient> onCloseListener : onCloseListeners) {

        onCloseListener.accept(this);

      }

    }

  }



  public void onClose(Consumer<FnApiControlClient> onCloseListener) {

    onCloseListeners.add(onCloseListener);

  }



  /** A private view of this class as a {@link StreamObserver} for connecting as a gRPC listener. */

  private class ResponseStreamObserver implements StreamObserver<UnifiedFnApi.InstructionResponse> {

    /**

     * Processes an incoming {@link UnifiedFnApi.InstructionResponse} by correlating it with the

     * corresponding {@link UnifiedFnApi.InstructionRequest} and completes the future that was returned

     * by {@link #handle}.

     */

    @Override

    public void onNext(UnifiedFnApi.InstructionResponse response) {

      LOG.debug("Received InstructionResponse {}", response);

      CompletableFuture<UnifiedFnApi.InstructionResponse> responseFuture =

          outstandingRequests.remove(response.getInstructionId());

      if (responseFuture != null) {

        if (response.getError().isEmpty()) {

          responseFuture.complete(response);

        } else {

          responseFuture.completeExceptionally(

              new RuntimeException(

                  String.format(

                      "Error received from SDK harness for instruction %s: %s",

                      response.getInstructionId(), response.getError())));

        }

      }

    }



    /** */

    @Override

    public void onCompleted() {

      closeAndTerminateOutstandingRequests(

          new IllegalStateException("SDK harness closed connection"));

    }



    @Override

    public void onError(Throwable cause) {

      LOG.error("{} received error {}", FnApiControlClient.class.getSimpleName(), cause);

      closeAndTerminateOutstandingRequests(cause);

    }

  }

}